{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "This is a companion notebook for the book [Deep Learning with Python, Third Edition](https://www.manning.com/books/deep-learning-with-python-third-edition). For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.\n\n**If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.**\n\nThe book's contents are available online at [deeplearningwithpython.io](https://deeplearningwithpython.io)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "!pip install keras keras-hub --upgrade -q"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ[\"KERAS_BACKEND\"] = \"jax\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "cellView": "form",
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "# @title\n",
    "import os\n",
    "from IPython.core.magic import register_cell_magic\n",
    "\n",
    "@register_cell_magic\n",
    "def backend(line, cell):\n",
    "    current, required = os.environ.get(\"KERAS_BACKEND\", \"\"), line.split()[-1]\n",
    "    if current == required:\n",
    "        get_ipython().run_cell(cell)\n",
    "    else:\n",
    "        print(\n",
    "            f\"This cell requires the {required} backend. To run it, change KERAS_BACKEND to \"\n",
    "            f\"\\\"{required}\\\" at the top of the notebook, restart the runtime, and rerun the notebook.\"\n",
    "        )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "## Language models and the Transformer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### The language model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Training a Shakespeare language model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import keras\n",
    "\n",
    "filename = keras.utils.get_file(\n",
    "    origin=(\n",
    "        \"https://storage.googleapis.com/download.tensorflow.org/\"\n",
    "        \"data/shakespeare.txt\"\n",
    "    ),\n",
    ")\n",
    "shakespeare = open(filename, \"r\").read()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "shakespeare[:250]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "sequence_length = 100\n",
    "\n",
    "def split_input(input, sequence_length):\n",
    "    for i in range(0, len(input), sequence_length):\n",
    "        yield input[i : i + sequence_length]\n",
    "\n",
    "features = list(split_input(shakespeare[:-1], sequence_length))\n",
    "labels = list(split_input(shakespeare[1:], sequence_length))\n",
    "dataset = tf.data.Dataset.from_tensor_slices((features, labels))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "x, y = next(dataset.as_numpy_iterator())\n",
    "x[:50], y[:50]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "from keras import layers\n",
    "\n",
    "tokenizer = layers.TextVectorization(\n",
    "    standardize=None,\n",
    "    split=\"character\",\n",
    "    output_sequence_length=sequence_length,\n",
    ")\n",
    "tokenizer.adapt(dataset.map(lambda text, labels: text))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "vocabulary_size = tokenizer.vocabulary_size()\n",
    "vocabulary_size"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "dataset = dataset.map(\n",
    "    lambda features, labels: (tokenizer(features), tokenizer(labels)),\n",
    "    num_parallel_calls=8,\n",
    ")\n",
    "training_data = dataset.shuffle(10_000).batch(64).cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "embedding_dim = 256\n",
    "hidden_dim = 1024\n",
    "\n",
    "inputs = layers.Input(shape=(sequence_length,), dtype=\"int\", name=\"token_ids\")\n",
    "x = layers.Embedding(vocabulary_size, embedding_dim)(inputs)\n",
    "x = layers.GRU(hidden_dim, return_sequences=True)(x)\n",
    "x = layers.Dropout(0.1)(x)\n",
    "outputs = layers.Dense(vocabulary_size, activation=\"softmax\")(x)\n",
    "model = keras.Model(inputs, outputs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "model.summary(line_length=80)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "model.compile(\n",
    "    optimizer=\"adam\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    metrics=[\"sparse_categorical_accuracy\"],\n",
    ")\n",
    "model.fit(training_data, epochs=20)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Generating Shakespeare"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "inputs = keras.Input(shape=(1,), dtype=\"int\", name=\"token_ids\")\n",
    "input_state = keras.Input(shape=(hidden_dim,), name=\"state\")\n",
    "\n",
    "x = layers.Embedding(vocabulary_size, embedding_dim)(inputs)\n",
    "x, output_state = layers.GRU(hidden_dim, return_state=True)(\n",
    "    x, initial_state=input_state\n",
    ")\n",
    "outputs = layers.Dense(vocabulary_size, activation=\"softmax\")(x)\n",
    "generation_model = keras.Model(\n",
    "    inputs=(inputs, input_state),\n",
    "    outputs=(outputs, output_state),\n",
    ")\n",
    "generation_model.set_weights(model.get_weights())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "tokens = tokenizer.get_vocabulary()\n",
    "token_ids = range(vocabulary_size)\n",
    "char_to_id = dict(zip(tokens, token_ids))\n",
    "id_to_char = dict(zip(token_ids, tokens))\n",
    "\n",
    "prompt = \"\"\"\n",
    "KING RICHARD III:\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "input_ids = [char_to_id[c] for c in prompt]\n",
    "state = keras.ops.zeros(shape=(1, hidden_dim))\n",
    "for token_id in input_ids:\n",
    "    inputs = keras.ops.expand_dims([token_id], axis=0)\n",
    "    predictions, state = generation_model.predict((inputs, state), verbose=0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "generated_ids = []\n",
    "max_length = 250\n",
    "for i in range(max_length):\n",
    "    next_char = int(np.argmax(predictions, axis=-1)[0])\n",
    "    generated_ids.append(next_char)\n",
    "    inputs = keras.ops.expand_dims([next_char], axis=0)\n",
    "    predictions, state = generation_model.predict((inputs, state), verbose=0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "output = \"\".join([id_to_char[token_id] for token_id in generated_ids])\n",
    "print(prompt + output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### Sequence-to-sequence learning"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### English-to-Spanish translation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import pathlib\n",
    "\n",
    "zip_path = keras.utils.get_file(\n",
    "    origin=(\n",
    "        \"http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip\"\n",
    "    ),\n",
    "    fname=\"spa-eng\",\n",
    "    extract=True,\n",
    ")\n",
    "text_path = pathlib.Path(zip_path) / \"spa-eng\" / \"spa.txt\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "with open(text_path) as f:\n",
    "    lines = f.read().split(\"\\n\")[:-1]\n",
    "text_pairs = []\n",
    "for line in lines:\n",
    "    english, spanish = line.split(\"\\t\")\n",
    "    spanish = \"[start] \" + spanish + \" [end]\"\n",
    "    text_pairs.append((english, spanish))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import random\n",
    "random.choice(text_pairs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import random\n",
    "\n",
    "random.shuffle(text_pairs)\n",
    "val_samples = int(0.15 * len(text_pairs))\n",
    "train_samples = len(text_pairs) - 2 * val_samples\n",
    "train_pairs = text_pairs[:train_samples]\n",
    "val_pairs = text_pairs[train_samples : train_samples + val_samples]\n",
    "test_pairs = text_pairs[train_samples + val_samples :]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import string\n",
    "import re\n",
    "\n",
    "strip_chars = string.punctuation + \"\u00bf\"\n",
    "strip_chars = strip_chars.replace(\"[\", \"\")\n",
    "strip_chars = strip_chars.replace(\"]\", \"\")\n",
    "\n",
    "def custom_standardization(input_string):\n",
    "    lowercase = tf.strings.lower(input_string)\n",
    "    return tf.strings.regex_replace(\n",
    "        lowercase, f\"[{re.escape(strip_chars)}]\", \"\"\n",
    "    )\n",
    "\n",
    "vocab_size = 15000\n",
    "sequence_length = 20\n",
    "\n",
    "english_tokenizer = layers.TextVectorization(\n",
    "    max_tokens=vocab_size,\n",
    "    output_mode=\"int\",\n",
    "    output_sequence_length=sequence_length,\n",
    ")\n",
    "spanish_tokenizer = layers.TextVectorization(\n",
    "    max_tokens=vocab_size,\n",
    "    output_mode=\"int\",\n",
    "    output_sequence_length=sequence_length + 1,\n",
    "    standardize=custom_standardization,\n",
    ")\n",
    "train_english_texts = [pair[0] for pair in train_pairs]\n",
    "train_spanish_texts = [pair[1] for pair in train_pairs]\n",
    "english_tokenizer.adapt(train_english_texts)\n",
    "spanish_tokenizer.adapt(train_spanish_texts)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "batch_size = 64\n",
    "\n",
    "def format_dataset(eng, spa):\n",
    "    eng = english_tokenizer(eng)\n",
    "    spa = spanish_tokenizer(spa)\n",
    "    features = {\"english\": eng, \"spanish\": spa[:, :-1]}\n",
    "    labels = spa[:, 1:]\n",
    "    sample_weights = labels != 0\n",
    "    return features, labels, sample_weights\n",
    "\n",
    "def make_dataset(pairs):\n",
    "    eng_texts, spa_texts = zip(*pairs)\n",
    "    eng_texts = list(eng_texts)\n",
    "    spa_texts = list(spa_texts)\n",
    "    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))\n",
    "    dataset = dataset.batch(batch_size)\n",
    "    dataset = dataset.map(format_dataset, num_parallel_calls=4)\n",
    "    return dataset.shuffle(2048).cache()\n",
    "\n",
    "train_ds = make_dataset(train_pairs)\n",
    "val_ds = make_dataset(val_pairs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "inputs, targets, sample_weights = next(iter(train_ds))\n",
    "print(inputs[\"english\"].shape)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "print(inputs[\"spanish\"].shape)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "print(targets.shape)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "print(sample_weights.shape)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Sequence-to-sequence learning with RNNs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "embed_dim = 256\n",
    "hidden_dim = 1024\n",
    "\n",
    "source = keras.Input(shape=(None,), dtype=\"int32\", name=\"english\")\n",
    "x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)\n",
    "rnn_layer = layers.GRU(hidden_dim)\n",
    "rnn_layer = layers.Bidirectional(rnn_layer, merge_mode=\"sum\")\n",
    "encoder_output = rnn_layer(x)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "target = keras.Input(shape=(None,), dtype=\"int32\", name=\"spanish\")\n",
    "x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(target)\n",
    "rnn_layer = layers.GRU(hidden_dim, return_sequences=True)\n",
    "x = rnn_layer(x, initial_state=encoder_output)\n",
    "x = layers.Dropout(0.5)(x)\n",
    "target_predictions = layers.Dense(vocab_size, activation=\"softmax\")(x)\n",
    "seq2seq_rnn = keras.Model([source, target], target_predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "seq2seq_rnn.summary(line_length=80)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "seq2seq_rnn.compile(\n",
    "    optimizer=\"adam\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    weighted_metrics=[\"accuracy\"],\n",
    ")\n",
    "seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "spa_vocab = spanish_tokenizer.get_vocabulary()\n",
    "spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))\n",
    "\n",
    "def generate_translation(input_sentence):\n",
    "    tokenized_input_sentence = english_tokenizer([input_sentence])\n",
    "    decoded_sentence = \"[start]\"\n",
    "    for i in range(sequence_length):\n",
    "        tokenized_target_sentence = spanish_tokenizer([decoded_sentence])\n",
    "        inputs = [tokenized_input_sentence, tokenized_target_sentence]\n",
    "        next_token_predictions = seq2seq_rnn.predict(inputs, verbose=0)\n",
    "        sampled_token_index = np.argmax(next_token_predictions[0, i, :])\n",
    "        sampled_token = spa_index_lookup[sampled_token_index]\n",
    "        decoded_sentence += \" \" + sampled_token\n",
    "        if sampled_token == \"[end]\":\n",
    "            break\n",
    "    return decoded_sentence\n",
    "\n",
    "test_eng_texts = [pair[0] for pair in test_pairs]\n",
    "for _ in range(5):\n",
    "    input_sentence = random.choice(test_eng_texts)\n",
    "    print(\"-\")\n",
    "    print(input_sentence)\n",
    "    print(generate_translation(input_sentence))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### The Transformer architecture"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Dot-product attention"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Transformer encoder block"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "class TransformerEncoder(keras.Layer):\n",
    "    def __init__(self, hidden_dim, intermediate_dim, num_heads):\n",
    "        super().__init__()\n",
    "        key_dim = hidden_dim // num_heads\n",
    "        self.self_attention = layers.MultiHeadAttention(num_heads, key_dim)\n",
    "        self.self_attention_layernorm = layers.LayerNormalization()\n",
    "        self.feed_forward_1 = layers.Dense(intermediate_dim, activation=\"relu\")\n",
    "        self.feed_forward_2 = layers.Dense(hidden_dim)\n",
    "        self.feed_forward_layernorm = layers.LayerNormalization()\n",
    "\n",
    "    def call(self, source, source_mask):\n",
    "        residual = x = source\n",
    "        mask = source_mask[:, None, :]\n",
    "        x = self.self_attention(query=x, key=x, value=x, attention_mask=mask)\n",
    "        x = x + residual\n",
    "        x = self.self_attention_layernorm(x)\n",
    "        residual = x\n",
    "        x = self.feed_forward_1(x)\n",
    "        x = self.feed_forward_2(x)\n",
    "        x = x + residual\n",
    "        x = self.feed_forward_layernorm(x)\n",
    "        return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Transformer decoder block"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "class TransformerDecoder(keras.Layer):\n",
    "    def __init__(self, hidden_dim, intermediate_dim, num_heads):\n",
    "        super().__init__()\n",
    "        key_dim = hidden_dim // num_heads\n",
    "        self.self_attention = layers.MultiHeadAttention(num_heads, key_dim)\n",
    "        self.self_attention_layernorm = layers.LayerNormalization()\n",
    "        self.cross_attention = layers.MultiHeadAttention(num_heads, key_dim)\n",
    "        self.cross_attention_layernorm = layers.LayerNormalization()\n",
    "        self.feed_forward_1 = layers.Dense(intermediate_dim, activation=\"relu\")\n",
    "        self.feed_forward_2 = layers.Dense(hidden_dim)\n",
    "        self.feed_forward_layernorm = layers.LayerNormalization()\n",
    "\n",
    "    def call(self, target, source, source_mask):\n",
    "        residual = x = target\n",
    "        x = self.self_attention(query=x, key=x, value=x, use_causal_mask=True)\n",
    "        x = x + residual\n",
    "        x = self.self_attention_layernorm(x)\n",
    "        residual = x\n",
    "        mask = source_mask[:, None, :]\n",
    "        x = self.cross_attention(\n",
    "            query=x, key=source, value=source, attention_mask=mask\n",
    "        )\n",
    "        x = x + residual\n",
    "        x = self.cross_attention_layernorm(x)\n",
    "        residual = x\n",
    "        x = self.feed_forward_1(x)\n",
    "        x = self.feed_forward_2(x)\n",
    "        x = x + residual\n",
    "        x = self.feed_forward_layernorm(x)\n",
    "        return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Sequence-to-sequence learning with a Transformer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "hidden_dim = 256\n",
    "intermediate_dim = 2048\n",
    "num_heads = 8\n",
    "\n",
    "source = keras.Input(shape=(None,), dtype=\"int32\", name=\"english\")\n",
    "x = layers.Embedding(vocab_size, hidden_dim)(source)\n",
    "encoder_output = TransformerEncoder(hidden_dim, intermediate_dim, num_heads)(\n",
    "    source=x,\n",
    "    source_mask=source != 0,\n",
    ")\n",
    "\n",
    "target = keras.Input(shape=(None,), dtype=\"int32\", name=\"spanish\")\n",
    "x = layers.Embedding(vocab_size, hidden_dim)(target)\n",
    "x = TransformerDecoder(hidden_dim, intermediate_dim, num_heads)(\n",
    "    target=x,\n",
    "    source=encoder_output,\n",
    "    source_mask=source != 0,\n",
    ")\n",
    "x = layers.Dropout(0.5)(x)\n",
    "target_predictions = layers.Dense(vocab_size, activation=\"softmax\")(x)\n",
    "transformer = keras.Model([source, target], target_predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "transformer.summary(line_length=80)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "transformer.compile(\n",
    "    optimizer=\"adam\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    weighted_metrics=[\"accuracy\"],\n",
    ")\n",
    "transformer.fit(train_ds, epochs=15, validation_data=val_ds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Embedding positional information"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "from keras import ops\n",
    "\n",
    "class PositionalEmbedding(keras.Layer):\n",
    "    def __init__(self, sequence_length, input_dim, output_dim):\n",
    "        super().__init__()\n",
    "        self.token_embeddings = layers.Embedding(input_dim, output_dim)\n",
    "        self.position_embeddings = layers.Embedding(sequence_length, output_dim)\n",
    "\n",
    "    def call(self, inputs):\n",
    "        positions = ops.cumsum(ops.ones_like(inputs), axis=-1) - 1\n",
    "        embedded_tokens = self.token_embeddings(inputs)\n",
    "        embedded_positions = self.position_embeddings(positions)\n",
    "        return embedded_tokens + embedded_positions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "hidden_dim = 256\n",
    "intermediate_dim = 2056\n",
    "num_heads = 8\n",
    "\n",
    "source = keras.Input(shape=(None,), dtype=\"int32\", name=\"english\")\n",
    "x = PositionalEmbedding(sequence_length, vocab_size, hidden_dim)(source)\n",
    "encoder_output = TransformerEncoder(hidden_dim, intermediate_dim, num_heads)(\n",
    "    source=x,\n",
    "    source_mask=source != 0,\n",
    ")\n",
    "\n",
    "target = keras.Input(shape=(None,), dtype=\"int32\", name=\"spanish\")\n",
    "x = PositionalEmbedding(sequence_length, vocab_size, hidden_dim)(target)\n",
    "x = TransformerDecoder(hidden_dim, intermediate_dim, num_heads)(\n",
    "    target=x,\n",
    "    source=encoder_output,\n",
    "    source_mask=source != 0,\n",
    ")\n",
    "x = layers.Dropout(0.5)(x)\n",
    "target_predictions = layers.Dense(vocab_size, activation=\"softmax\")(x)\n",
    "transformer = keras.Model([source, target], target_predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "transformer.compile(\n",
    "    optimizer=\"adam\",\n",
    "    loss=\"sparse_categorical_crossentropy\",\n",
    "    weighted_metrics=[\"accuracy\"],\n",
    ")\n",
    "transformer.fit(train_ds, epochs=30, validation_data=val_ds)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "spa_vocab = spanish_tokenizer.get_vocabulary()\n",
    "spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))\n",
    "\n",
    "def generate_translation(input_sentence):\n",
    "    tokenized_input_sentence = english_tokenizer([input_sentence])\n",
    "    decoded_sentence = \"[start]\"\n",
    "    for i in range(sequence_length):\n",
    "        tokenized_target_sentence = spanish_tokenizer([decoded_sentence])\n",
    "        tokenized_target_sentence = tokenized_target_sentence[:, :-1]\n",
    "        inputs = [tokenized_input_sentence, tokenized_target_sentence]\n",
    "        next_token_predictions = transformer.predict(inputs, verbose=0)\n",
    "        sampled_token_index = np.argmax(next_token_predictions[0, i, :])\n",
    "        sampled_token = spa_index_lookup[sampled_token_index]\n",
    "        decoded_sentence += \" \" + sampled_token\n",
    "        if sampled_token == \"[end]\":\n",
    "            break\n",
    "    return decoded_sentence\n",
    "\n",
    "test_eng_texts = [pair[0] for pair in test_pairs]\n",
    "for _ in range(5):\n",
    "    input_sentence = random.choice(test_eng_texts)\n",
    "    print(\"-\")\n",
    "    print(input_sentence)\n",
    "    print(generate_translation(input_sentence))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### Classification with a pretrained Transformer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Pretraining a Transformer encoder"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Loading a pretrained Transformer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import keras_hub\n",
    "\n",
    "tokenizer = keras_hub.models.Tokenizer.from_preset(\"roberta_base_en\")\n",
    "backbone = keras_hub.models.Backbone.from_preset(\"roberta_base_en\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "tokenizer(\"The quick brown fox\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "backbone.summary(line_length=80)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Preprocessing IMDb movie reviews"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "import os, pathlib, shutil, random\n",
    "\n",
    "zip_path = keras.utils.get_file(\n",
    "    origin=\"https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz\",\n",
    "    fname=\"imdb\",\n",
    "    extract=True,\n",
    ")\n",
    "\n",
    "imdb_extract_dir = pathlib.Path(zip_path) / \"aclImdb\"\n",
    "train_dir = pathlib.Path(\"imdb_train\")\n",
    "test_dir = pathlib.Path(\"imdb_test\")\n",
    "val_dir = pathlib.Path(\"imdb_val\")\n",
    "\n",
    "shutil.copytree(imdb_extract_dir / \"test\", test_dir, dirs_exist_ok=True)\n",
    "\n",
    "val_percentage = 0.2\n",
    "for category in (\"neg\", \"pos\"):\n",
    "    src_dir = imdb_extract_dir / \"train\" / category\n",
    "    src_files = os.listdir(src_dir)\n",
    "    random.Random(1337).shuffle(src_files)\n",
    "    num_val_samples = int(len(src_files) * val_percentage)\n",
    "\n",
    "    os.makedirs(train_dir / category, exist_ok=True)\n",
    "    os.makedirs(val_dir / category, exist_ok=True)\n",
    "    for index, file in enumerate(src_files):\n",
    "        if index < num_val_samples:\n",
    "            shutil.copy(src_dir / file, val_dir / category / file)\n",
    "        else:\n",
    "            shutil.copy(src_dir / file, train_dir / category / file)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "from keras.utils import text_dataset_from_directory\n",
    "\n",
    "batch_size = 16\n",
    "train_ds = text_dataset_from_directory(train_dir, batch_size=batch_size)\n",
    "val_ds = text_dataset_from_directory(val_dir, batch_size=batch_size)\n",
    "test_ds = text_dataset_from_directory(test_dir, batch_size=batch_size)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "def preprocess(text, label):\n",
    "    packer = keras_hub.layers.StartEndPacker(\n",
    "        sequence_length=512,\n",
    "        start_value=tokenizer.start_token_id,\n",
    "        end_value=tokenizer.end_token_id,\n",
    "        pad_value=tokenizer.pad_token_id,\n",
    "        return_padding_mask=True,\n",
    "    )\n",
    "    token_ids, padding_mask = packer(tokenizer(text))\n",
    "    return {\"token_ids\": token_ids, \"padding_mask\": padding_mask}, label\n",
    "\n",
    "preprocessed_train_ds = train_ds.map(preprocess)\n",
    "preprocessed_val_ds = val_ds.map(preprocess)\n",
    "preprocessed_test_ds = test_ds.map(preprocess)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "next(iter(preprocessed_train_ds))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "#### Fine-tuning a pretrained Transformer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "inputs = backbone.input\n",
    "x = backbone(inputs)\n",
    "x = x[:, 0, :]\n",
    "x = layers.Dropout(0.1)(x)\n",
    "x = layers.Dense(768, activation=\"relu\")(x)\n",
    "x = layers.Dropout(0.1)(x)\n",
    "outputs = layers.Dense(1, activation=\"sigmoid\")(x)\n",
    "classifier = keras.Model(inputs, outputs)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "classifier.compile(\n",
    "    optimizer=keras.optimizers.Adam(5e-5),\n",
    "    loss=\"binary_crossentropy\",\n",
    "    metrics=[\"accuracy\"],\n",
    ")\n",
    "classifier.fit(\n",
    "    preprocessed_train_ds,\n",
    "    validation_data=preprocessed_val_ds,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 0,
   "metadata": {
    "colab_type": "code"
   },
   "outputs": [],
   "source": [
    "classifier.evaluate(preprocessed_test_ds)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text"
   },
   "source": [
    "### What makes the Transformer effective?"
   ]
  }
 ],
 "metadata": {
  "accelerator": "GPU",
  "colab": {
   "collapsed_sections": [],
   "name": "chapter15_language-models-and-the-transformer",
   "private_outputs": false,
   "provenance": [],
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}